package com.hao.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BoundedStreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 读取文件
        DataStreamSource<String> lineDSS = env.readTextFile("input/words.txt");
        // 3. 转换数据格式
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS.flatMap(new MyStreamFlapMap())
                .returns(Types.TUPLE(Types.STRING,Types.LONG));//代表返回的是什么类型
        // 4. 分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne.keyBy(data -> data.f0);

        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
                .sum(1);
        // 6. 打印
        result.print();
        // 7. 执行
        env.execute();

    }
    
    //定义内部类，实现FlatMapFunction接口
    public static class MyStreamFlapMap implements FlatMapFunction<String,Tuple2<String,Long>>{

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
            // 每一行数据按照 空格切分
            String[] words = value.split(" ");
            // 将每个单词转换成二元组输出
            for (String word : words) {
                out.collect(new Tuple2<>(word,1L));
            }
        }
    }
}
